SortShuffleManager — The Default Shuffle System

SortShuffleManager is the one and only shuffle manager in Spark with the short name sort or tungsten-sort.

SortShuffleManager uses IndexShuffleBlockResolver (as shuffleBlockResolver internal registry).

Table 1. SortShuffleManager Internal Registries and Counters
Name Description

numMapsForShuffle

shuffleBlockResolver

Used when ???

Tip

Enable DEBUG logging level for org.apache.spark.shuffle.sort.SortShuffleManager$ logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager$=DEBUG

Refer to Logging.

Creating SortShuffleManager Instance

SortShuffleManager takes a SparkConf.

SortShuffleManager makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:

WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.
Note
SortShuffleManager is created when SparkEnv is created (per the driver and executors).

Getting ShuffleHandle — registerShuffle Method

registerShuffle[K, V, C](
  shuffleId: Int,
  numMaps: Int,
  dependency: ShuffleDependency[K, V, C]): ShuffleHandle
Note
registerShuffle is a part of ShuffleManager contract.

registerShuffle returns a new ShuffleHandle that is one of the following:

  1. BypassMergeSortShuffleHandle (with ShuffleDependency[K, V, V]) when shouldBypassMergeSort condition holds.

  2. SerializedShuffleHandle (with ShuffleDependency[K, V, V]) when canUseSerializedShuffle condition holds.

  3. BaseShuffleHandle

Returning ShuffleWriter For ShuffleHandle — getWriter Method

getWriter[K, V](
  handle: ShuffleHandle,
  mapId: Int,
  context: TaskContext): ShuffleWriter[K, V]
Note
getWriter is a part of ShuffleManager contract.

Internally, getWriter makes sure that a ShuffleHandle is associated with its numMaps in numMapsForShuffle internal registry.

Note
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle).

getWriter then returns a new ShuffleWriter for the input ShuffleHandle:

  1. UnsafeShuffleWriter for SerializedShuffleHandle.

  2. BypassMergeSortShuffleWriter for BypassMergeSortShuffleHandle.

  3. SortShuffleWriter for BaseShuffleHandle.

shouldBypassMergeSort Method

shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean

shouldBypassMergeSort holds (i.e. is positive) when the input ShuffleDependency has mapSideCombine flag enabled and aggregator defined.

shouldBypassMergeSort holds when mapSideCombine flag is disabled but the number of partitions (of the input ShuffleDependency) is smaller than spark.shuffle.sort.bypassMergeThreshold Spark property.

Otherwise, shouldBypassMergeSort is negative (i.e. false).

Note
shouldBypassMergeSort is exclusively used when registerShuffle selects a ShuffleHandle.

Checking If SerializedShuffleHandle Can Be Used for ShuffleHandle — canUseSerializedShuffle Method

canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean

canUseSerializedShuffle condition holds (i.e. is positive) when all of the following hold (checked in that order):

You should see the following DEBUG message in the logs when canUseSerializedShuffle holds:

DEBUG Can use serialized shuffle for shuffle [id]

Otherwise, canUseSerializedShuffle does not hold and you should see one of the following DEBUG messages:

DEBUG Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation

DEBUG Can't use serialized shuffle for shuffle [id] because an aggregator is defined

DEBUG Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions
Note
canUseSerializedShuffle is exclusively used when registerShuffle selects a ShuffleHandle.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.shuffle.sort.bypassMergeThreshold

200

spark.shuffle.spill

true

No longer in use.

When false the following WARN shows in the logs when SortShuffleManager is created:

WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.

results matching ""

    No results matching ""